package com.echatim.broker;

import com.broker.base.IBrokerEventBus;
import com.broker.base.IBrokerStorage;
import com.broker.base.protocol.request.RequestMessage;
import com.broker.hook.BrokerLinkableHook;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.echatim.broker.localsvc.TopicDispatcher;
import com.echatim.broker.localsvc.TopicSender;
import com.echatim.broker.storage.event.ClientDisconnectMessage;
import com.event.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

/**
 *  broker 业务处理
 *
 * */
@Slf4j
@ConditionalOnProperty(name="echatim.sdk.auth-type", havingValue="community")
@Component
public class AppBrokerHook extends BrokerLinkableHook {
    private TopicDispatcher appDispatcher;
    private TopicSender appSender;
    private EventBus eventBus;
    @Value("${echatim.sdk.stress-test}")
    private Boolean stressTest = false;

    public AppBrokerHook(@Autowired TopicDispatcher appDispatcher, @Autowired TopicSender appSender, @Autowired EventBus eventBus) {
        this.appDispatcher = appDispatcher;
        this.appSender = appSender;
        this.eventBus = eventBus;
    }


    @Override
    public void startup(SocketIOServer socketIOServer, IBrokerStorage IBrokerStorage, IBrokerEventBus eventBus) {
        this.appSender.registerEventBus(eventBus);
        this.appSender.setSocketServer(socketIOServer);
    }

    public void onConnected(SocketIOClient client) {
        String clientId = client.getSessionId().toString();
        log.info(" client:" + clientId + " connected.");
    }

    public void onDisConnected(SocketIOClient client) {
        String clientId = client.getSessionId().toString();
        log.info(" client:" + clientId + " disconnected.");
        eventBus.post(new ClientDisconnectMessage().setClientId(clientId));
    }

    public void onReceiveMessage(SocketIOClient client, RequestMessage message, AckRequest ackSender) {
        // 转发到业务层
        com.broker.base.protocol.response.ResponseMessage responseMessage = this.appDispatcher.consumeMessage(message);
        ackSender.sendAckData(responseMessage);
        // 压测相关功能
        if(stressTest){
            client.sendEvent("topic.test_ack", responseMessage);
        }
        if (this.next != null) {
            this.next.onReceiveMessage(client, message, ackSender);
        }
    }
}
